In [2]:
import numpy as np
from numpy import ma
from pyproj import Geod
from bs4 import BeautifulSoup
import requests
from metpy.io.nexrad import Level3File
from metpy.plots import ctables
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
import os, tarfile, wget, re

In [6]:
tarfile_list = requests.get('http://www1.ncdc.noaa.gov/pub/has/HAS010805581/')
tar_list_soup = BeautifulSoup(tarfile_list.content, 'html.parser')
filename_list = tar_list_soup.select('td a')
noaa_filenames = list()

for filename in filename_list:
    noaa_filenames.append(filename['href'])

# Removing first because on the site will always be parent dir link
noaa_filenames = noaa_filenames[1:]
print(len(noaa_filenames))
print(noaa_filenames[:5])


974
['NWS_NEXRAD_NXL3_KLOT_20140101000000_20140101235959.tar.gz', 'NWS_NEXRAD_NXL3_KLOT_20140102000000_20140102235959.tar.gz', 'NWS_NEXRAD_NXL3_KLOT_20140103000000_20140103235959.tar.gz', 'NWS_NEXRAD_NXL3_KLOT_20140104000000_20140104235959.tar.gz', 'NWS_NEXRAD_NXL3_KLOT_20140105000000_20140105235959.tar.gz']

In [28]:
def extract_data(file_obj):
    # Pull the data out of the file object
    f = Level3File(file_obj)
    datadict = f.sym_block[0][0]
    if 'data' not in datadict.keys():
        return None, None, None
    
    data = ma.array(datadict['data'])
    data[data==0] = ma.masked

    az = np.array(datadict['start_az'] + [datadict['end_az'][-1]])
    rng = np.linspace(0, f.max_range, data.shape[-1] + 1)

    # Data from MetPy needs to be converted to latitude and longitude coordinates
    g = Geod(ellps='clrk66')
    center_lat = np.ones([len(az),len(rng)])*f.lat
    center_lon = np.ones([len(az),len(rng)])*f.lon

    az2D = np.ones_like(center_lat)*az[:,None]
    rng2D = np.ones_like(center_lat)*np.transpose(rng[:,None])*1000
    lon,lat,back = g.fwd(center_lon,center_lat,az2D,rng2D)
    
    return lon, lat, data

def unstack_data(lon, lat, data):
    lat_df = pd.DataFrame(lat)
    lon_df = pd.DataFrame(lon)
    data_df = pd.DataFrame(data)
    
    lon_stack = lon_df.stack().reset_index()
    lon_stack = lon_stack.rename(columns={'level_0': 'x', 'level_1': 'y', 0: 'lon'})
    lat_stack = lat_df.stack().reset_index()
    lat_stack = lat_stack.rename(columns={'level_0': 'x', 'level_1': 'y', 0: 'lat'})
    coord_merge = pd.merge(lat_stack, lon_stack, on=['x', 'y'])
    
    data_stack = data_df.stack().reset_index()
    data_stack = data_stack.rename(columns={'level_0': 'x', 'level_1': 'y', 0: 'precip'})
    merged_data = pd.merge(coord_merge, data_stack, on=['x', 'y'], how='left')[['lat','lon','precip']]
    noaa_df = merged_data.dropna()
    return noaa_df
    
def spatial_join(noaa_df, gpd_file, group_col, file_time):
    #chi_wards = gpd.read_file('data/chicago_wards.geojson')
    geo_df = gpd.read_file(gpd_file)
    crs = {'init':'epsg:4326'}
    geometry = [Point(xy) for xy in zip(daa_df.lon, daa_df.lat)]
    noaa_geo = gpd.GeoDataFrame(noaa_df, crs=crs, geometry=geometry)
    merged_noaa = gpd.tools.sjoin(noaa_geo, geo_df, how='right', op='within').reset_index()
    
    noaa_grouped = merged_noaa.groupby([group_col])['precip'].mean().reset_index()
    noaa_grouped[group_col] = noaa_by_ward[group_col].astype(int)
    noaa_grouped.fillna(value=0, inplace=True)
    noaa_grouped.sort_values(by=group_col, inplace=True)
    noaa_grouped.to_csv('data/noaa_processed/{}_{}.csv'.format(group_col, file_time), index=False)

In [30]:
noaa_files = list()

tar = tarfile.open('nexrad_data/NWS_NEXRAD_NXL3_KLOT_20160830000000_20160830235959.tar.gz','r:gz')
for t in tar.getnames():
    if re.match(r'.*N1PLOT.*', t):
        noaa_files.append(t)
print(noaa_files[:5])
rand_noaa = tar.extractfile(noaa_files[4])
file_time = noaa_files[4].split('_')[-1]
lon, lat, data = extract_data(rand_noaa)

if lon is not None:
    processed_df = unstack_data(lon, lat, data)
    spatial_join(processed_df, 'data/chicago_wards.geojson', 'ward', file_time)
else:
    print('returned None')


['KLOT_SDUS33_N1PLOT_201608300002', 'KLOT_SDUS33_N1PLOT_201608300007', 'KLOT_SDUS33_N1PLOT_201608300013', 'KLOT_SDUS33_N1PLOT_201608300019', 'KLOT_SDUS33_N1PLOT_201608300025']
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-30-6e490598c051> in <module>()
     12 if lon is not None:
     13     processed_df = unstack_data(lon, lat, data)
---> 14     spatial_join(processed_df, 'data/chicago_wards.geojson', 'ward', file_time)
     15 else:
     16     print('returned None')

<ipython-input-28-036684401ca4> in spatial_join(noaa_df, gpd_file, group_col, file_time)
     44     geo_df = gpd.read_file(gpd_file)
     45     crs = {'init':'epsg:4326'}
---> 46     geometry = [Point(xy) for xy in zip(daa_df.lon, daa_df.lat)]
     47     noaa_geo = gpd.GeoDataFrame(noaa_df, crs=crs, geometry=geometry)
     48     merged_noaa = gpd.tools.sjoin(noaa_geo, geo_df, how='right', op='within').reset_index()

NameError: name 'daa_df' is not defined

In [2]:
# Using wget module to make long-running downloads easier
#file_path = wget.download('http://www1.ncdc.noaa.gov/pub/has/HAS010805581/NWS_NEXRAD_NXL3_KLOT_20160901000000_20160901235959.tar.gz',
#                          out='nexrad_data')


100% [......................................................................] 252090873 / 252090873

NEXRAD ETL

NEXRAD data has location as well as precipitation at a high level of detail. Planning on using the One Hour Precipitation DAA Level III file, but other options are listed here: NEXRAD Data Products. Currently working on an ETL process:

  • Download data from pre-processed NEXRAD Level III data on precipitation through the NOAA Climate Data Online Search
  • Pull batch downloads from FTP or HTTPS location similar to this http://www1.ncdc.noaa.gov/pub/has/{DOWNLOAD_ID}
  • Because files are large gzipped tar files, extract just the files for the desired data product (DAA in this case)
  • Read NEXRAD data in directly with MetPy
  • Reduce detail on precipitation into DataFrame with x, y, lat, lon, and precip columns (and also handle files that have no precipitation
  • Use GeoPandas to perform a spatial join on the data onto a larger polygon dataset, and setting the polygon precip value to the mean precip value from all the points within it
  • Combine the many NEXRAD DataFrames produced by this together

Additional Details

  • Units seem to be in millimeters, judged by the description in this paper using Quantitative Precipitation Estimates from NEXRAD: DuPage County Modeling

In [6]:
# We don't need most of the files in the tar file, use the tarfile module to only pull ones necessary
# 
# Once DAA files are downloaded, delete the rest of the archive
daa_files = list()

tar = tarfile.open('nexrad_data/NWS_NEXRAD_NXL3_KLOT_20160901000000_20160901235959.tar.gz',"r:gz")
for t in tar.getnames():
    if re.match(r'.*DAALOT.*', t):
        daa_files.append(t)
        
print(daa_files[20:30])
print(daa_files[29])


['KLOT_SDUS83_DAALOT_201609010322', 'KLOT_SDUS83_DAALOT_201609010331', 'KLOT_SDUS83_DAALOT_201609010341', 'KLOT_SDUS83_DAALOT_201609010351', 'KLOT_SDUS83_DAALOT_201609010401', 'KLOT_SDUS83_DAALOT_201609010410', 'KLOT_SDUS83_DAALOT_201609010420', 'KLOT_SDUS83_DAALOT_201609010430', 'KLOT_SDUS83_DAALOT_201609010440', 'KLOT_SDUS83_DAALOT_201609010449']
KLOT_SDUS83_DAALOT_201609010459

In [7]:
# Random DAA file that happens to have data
rand_daa = tar.extractfile(daa_files[29])
f = Level3File(rand_daa)

# Pull the data out of the file object
datadict = f.sym_block[0][0]
data = ma.array(datadict['data'])
data[data==0] = ma.masked

az = np.array(datadict['start_az'] + [datadict['end_az'][-1]])
rng = np.linspace(0, f.max_range, data.shape[-1] + 1)

# Data from MetPy needs to be converted to latitude and longitude coordinates
g = Geod(ellps='clrk66')
center_lat = np.ones([len(az),len(rng)])*f.lat
center_lon = np.ones([len(az),len(rng)])*f.lon

az2D = np.ones_like(center_lat)*az[:,None]
rng2D = np.ones_like(center_lat)*np.transpose(rng[:,None])*1000
lon,lat,back = g.fwd(center_lon,center_lat,az2D,rng2D)

In [8]:
# Once the data is returned, it can be converted into DataFrames for easier manipulation
lat_df = pd.DataFrame(lat)
lon_df = pd.DataFrame(lon)
data_df = pd.DataFrame(data)
print(lat_df.shape)
lat_df.head()


(361, 921)
Out[8]:
0 1 2 3 4 5 6 7 8 9 ... 911 912 913 914 915 916 917 918 919 920
0 41.604 41.606251 41.608502 41.610753 41.613004 41.615255 41.617506 41.619757 41.622008 41.624259 ... 43.654248 43.656498 43.658748 43.660998 43.663249 43.665499 43.667749 43.669999 43.672249 43.674499
1 41.604 41.606251 41.608501 41.610752 41.613002 41.615253 41.617504 41.619754 41.622005 41.624255 ... 43.653925 43.656175 43.658425 43.660675 43.662925 43.665174 43.667424 43.669674 43.671924 43.674173
2 41.604 41.606250 41.608499 41.610749 41.612998 41.615248 41.617497 41.619747 41.621997 41.624246 ... 43.652958 43.655207 43.657455 43.659704 43.661953 43.664201 43.666450 43.668699 43.670947 43.673196
3 41.604 41.606248 41.608496 41.610744 41.612991 41.615239 41.617487 41.619735 41.621983 41.624231 ... 43.651346 43.653593 43.655839 43.658086 43.660333 43.662580 43.664827 43.667074 43.669320 43.671567
4 41.604 41.606245 41.608491 41.610736 41.612982 41.615227 41.617473 41.619718 41.621964 41.624209 ... 43.649089 43.651334 43.653578 43.655822 43.658066 43.660311 43.662555 43.664799 43.667044 43.669288

5 rows × 921 columns


In [9]:
# Stack DataFrames so dealing with more rows than columns
lon_stack = lon_df.stack().reset_index()
lon_stack = lon_stack.rename(columns={'level_0': 'x', 'level_1': 'y', 0: 'lon'})
lat_stack = lat_df.stack().reset_index()
lat_stack = lat_stack.rename(columns={'level_0': 'x', 'level_1': 'y', 0: 'lat'})
print(lon_stack.shape)
lon_stack.head()


(332481, 3)
Out[9]:
x y lon
0 0 0 -88.085
1 0 1 -88.085
2 0 2 -88.085
3 0 3 -88.085
4 0 4 -88.085

In [10]:
# Merge lat and lon DataFrames on x and y indices from original matrix to join data_df
coord_merge = pd.merge(lat_stack, lon_stack, on=['x', 'y'])
print(coord_merge.shape)
coord_merge.head()


(332481, 4)
Out[10]:
x y lat lon
0 0 0 41.604000 -88.085
1 0 1 41.606251 -88.085
2 0 2 41.608502 -88.085
3 0 3 41.610753 -88.085
4 0 4 41.613004 -88.085

In [11]:
# Do the same with the precipitation DataFrame, and then merge it back onto 
data_stack = data_df.stack().reset_index()
data_stack = data_stack.rename(columns={'level_0': 'x', 'level_1': 'y', 0: 'precip'})
print(data_stack.shape)
data_stack.head()


(21802, 3)
Out[11]:
x y precip
0 0 26 1.0
1 0 27 1.0
2 0 67 1.0
3 0 68 1.0
4 0 69 1.0

In [12]:
# This file had more information for precipitation, so the values are all on a scale of 0 to 255
data_stack['precip'].unique()


Out[12]:
array([   1.,    2.,    3.,    7.,    4.,    5.,    6.,   31.,   27.,
         14.,   61.,   66.,   67.,   82.,   91.,  106.,  146.,    8.,
          9.,   12.,  150.,   49.,   20.,   38.,   24.,   19.,   16.,
         30.,   48.,   45.,   10.,   21.,   22.,   43.,   47.,   53.,
         15.,   17.,   11.,   35.,   60.,   33.,   13.,   23.,   78.,
        255.,  178.,  103.,   54.,   18.,   25.,   29.,   46.,   62.,
         26.,  142.,  176.,  202.,  217.,  193.,  111.,   28.,   39.,
         41.,   56.,   86.,   69.,   63.,   34.,   52.,   71.,   32.])

In [13]:
# Merge coordinates and precipitation data together
merged_data = pd.merge(coord_merge, data_stack, on=['x', 'y'], how='left')
print(merged_data.shape)
merged_data.head()


(332481, 5)
Out[13]:
x y lat lon precip
0 0 0 41.604000 -88.085 NaN
1 0 1 41.606251 -88.085 NaN
2 0 2 41.608502 -88.085 NaN
3 0 3 41.610753 -88.085 NaN
4 0 4 41.613004 -88.085 NaN

In [14]:
# To reduce data size and ignore unnecessary data, we can drop the many NaN rows
daa_df = merged_data.dropna()
print(daa_df.shape)
daa_df.head()


(21802, 5)
Out[14]:
x y lat lon precip
26 0 26 41.662525 -88.085 1.0
27 0 27 41.664775 -88.085 1.0
67 0 67 41.754812 -88.085 1.0
68 0 68 41.757063 -88.085 1.0
69 0 69 41.759314 -88.085 1.0

In [15]:
chi_wards = gpd.read_file('data/chicago_wards.geojson')
print(type(chi_wards))
chi_wards.head()


<class 'geopandas.geodataframe.GeoDataFrame'>
Out[15]:
geometry ward
0 (POLYGON ((-87.69623470134458 41.8575549523838... 12
1 (POLYGON ((-87.66288923669032 41.7988380986824... 16
2 (POLYGON ((-87.69817510963803 41.8172944075599... 15
3 (POLYGON ((-87.65524133440029 41.8088331618279... 20
4 (POLYGON ((-87.66420403810295 42.0212615805274... 49

In [16]:
# Convert lat and lon into shapely Point objects and make into GeoDataFrame
# Important that the crs values are the same
crs = {'init':'epsg:4326'}
geometry = [Point(xy) for xy in zip(daa_df.lon, daa_df.lat)]
daa_geo = gpd.GeoDataFrame(daa_df, crs=crs, geometry=geometry)
daa_geo.head()


Out[16]:
x y lat lon precip geometry
26 0 26 41.662525 -88.085 1.0 POINT (-88.08500000000001 41.6625245254508)
27 0 27 41.664775 -88.085 1.0 POINT (-88.08500000000001 41.66477545666353)
67 0 67 41.754812 -88.085 1.0 POINT (-88.08500000000001 41.75481197165804)
68 0 68 41.757063 -88.085 1.0 POINT (-88.08500000000001 41.75706286619285)
69 0 69 41.759314 -88.085 1.0 POINT (-88.08500000000001 41.75931375983289)

In [18]:
# Spatial join, important for speed that op is 'within', and retain all boundary keys with right join
ward_daa = gpd.tools.sjoin(daa_geo, chi_wards, how='right', op='within')
ward_daa.head()


Out[18]:
x y lat lon precip index_left geometry ward
index_right
0.0 49.0 167.0 41.849989 -87.705588 1.0 45296.0 (POLYGON ((-87.69623470134458 41.8575549523838... 12
0.0 50.0 165.0 41.842103 -87.704549 1.0 46215.0 (POLYGON ((-87.69623470134458 41.8575549523838... 12
0.0 49.0 168.0 41.851458 -87.703308 1.0 45297.0 (POLYGON ((-87.69623470134458 41.8575549523838... 12
0.0 50.0 166.0 41.843542 -87.702234 1.0 46216.0 (POLYGON ((-87.69623470134458 41.8575549523838... 12
0.0 50.0 168.0 41.846420 -87.697605 1.0 46218.0 (POLYGON ((-87.69623470134458 41.8575549523838... 12

In [19]:
ward_daa_df = ward_daa.reset_index()
ward_daa_df.head()


Out[19]:
index_right x y lat lon precip index_left geometry ward
0 0.0 49.0 167.0 41.849989 -87.705588 1.0 45296.0 (POLYGON ((-87.69623470134458 41.8575549523838... 12
1 0.0 50.0 165.0 41.842103 -87.704549 1.0 46215.0 (POLYGON ((-87.69623470134458 41.8575549523838... 12
2 0.0 49.0 168.0 41.851458 -87.703308 1.0 45297.0 (POLYGON ((-87.69623470134458 41.8575549523838... 12
3 0.0 50.0 166.0 41.843542 -87.702234 1.0 46216.0 (POLYGON ((-87.69623470134458 41.8575549523838... 12
4 0.0 50.0 168.0 41.846420 -87.697605 1.0 46218.0 (POLYGON ((-87.69623470134458 41.8575549523838... 12

In [20]:
daa_by_ward = ward_daa_df.groupby(['ward'])['precip'].mean().reset_index()
daa_by_ward['ward'] = daa_by_ward['ward'].astype(int)
daa_by_ward.fillna(value=0, inplace=True)
daa_by_ward.sort_values(by='ward', inplace=True)
daa_by_ward.head()


Out[20]:
ward precip
0 1 1.280000
11 2 1.277778
22 3 0.000000
33 4 1.000000
44 5 0.000000

In [29]:
chi_wards['ward'] = chi_wards['ward'].astype(int)
chi_ward_precip = chi_wards.merge(daa_by_ward, on='ward')
print(chi_ward_precip.dtypes)
print(chi_ward_precip.shape)
chi_ward_precip.head()


geometry     object
ward          int32
precip      float64
dtype: object
(50, 3)
Out[29]:
geometry ward precip
0 (POLYGON ((-87.69623470134458 41.8575549523838... 12 1.25
1 (POLYGON ((-87.66288923669032 41.7988380986824... 16 0.00
2 (POLYGON ((-87.69817510963803 41.8172944075599... 15 0.00
3 (POLYGON ((-87.65524133440029 41.8088331618279... 20 0.00
4 (POLYGON ((-87.66420403810295 42.0212615805274... 49 1.00

In [31]:
# Save to inches with units as millimeters
chi_ward_precip.to_file('data/chi_ward_precip_millim.geojson', driver='GeoJSON')

# Converting precip to inches, saving to file
chi_ward_in = chi_ward_precip.copy()
chi_ward_in['precip'] = chi_ward_in['precip'].apply(lambda x: x / 25.4)
chi_ward_in.to_file('data/chi_ward_precip_in.geojson', driver='GeoJSON')

In [ ]:
# When doing actual ETL for multiple files, will want to delete tar archives, keeping for now
# os.remove(file_path)